#include "consumer_aliyun.h"

#include "business_def.h"
#include "verifyforcontrolcache.h"
#include "pfunc.h"
#include "cJSON.h"
#include "Log.h"
#ifdef WIN32
#define usleep(x) Sleep(x)
#endif

ConsumerAliyun::ConsumerAliyun(std::map<std::string, AliyunDeviceMaps> subTopicMaps_)
	: running(true)
	, subTopicMaps(subTopicMaps_)
	, ptr_CacheDataObj(BusinessDef::getInstance())
	, is_Verification(false)
	, ptr_vcc(VerifyForControlCache::getInstance())
	, to_gather_queue(QueueDataSingle<DataToGather>::getInstance())
	, cache_from_aliyun_queue(QueueDataSingle<RCacheAliyun>::getInstance())
{
	cache_from_aliyun_queue->setQueueDesc("queue_from_aliyun");
	//
	is_Verification = ptr_CacheDataObj->getVerificationFunc();
}

ConsumerAliyun::~ConsumerAliyun()
{
	running = false;
}


void* ConsumerAliyun::run()
{
	while (running)
	{
		receive();
		usleep(100);
	}
	return 0;
}

void ConsumerAliyun::receive()
{
	RCacheAliyun item_cache;
	if (cache_from_aliyun_queue->pop(item_cache))
	{
		Print_NOTICE("topic:%s\npayload:%s\n", item_cache.topic.c_str(), item_cache.payload.c_str());
		std::map<std::string, AliyunDeviceMaps>::iterator it = subTopicMaps.find(item_cache.topic);
		if (it != subTopicMaps.end())
		{
			//printf("********************&********************\n");
			try {
				cJSON *request_root = NULL;
				request_root = cJSON_Parse(item_cache.payload.c_str());
				if (request_root == NULL) {
					Print_WARN("JSON Parse Error\n");
					return;
				}
				if (!cJSON_IsObject(request_root)) {
					Print_WARN("JSON isn't Object Error\n");
					cJSON_Delete(request_root);
					return;
				}
				cJSON *item_propertyid = NULL, *item_params = NULL;
				item_propertyid = cJSON_GetObjectItem(request_root, "params");
				if (item_propertyid != NULL && cJSON_IsObject(item_propertyid))
				{
					//EXAMPLE_TRACE("Property text:%s Value: %s"
					//	,item_propertyid->string, item_propertyid->valuestring);
					for (int j = 0; j < cJSON_GetArraySize(item_propertyid); j++)
					{
						item_params = cJSON_GetArrayItem(item_propertyid, j);
						if (item_params != NULL && cJSON_IsNumber(item_params))
						{
							Print_NOTICE("Property ID, index: %d, text:%s, Value: %.2f\n"
								, j, item_params->string, item_params->valuedouble);
							std::map<std::string, unsigned int>::iterator itp = it->second.aliyun_keys.find(item_params->string);
							if (itp != it->second.aliyun_keys.end())
							{
								createControlCmd(it->second.id, itp->second, static_cast<float>(item_params->valuedouble));
							}
						}
						if (item_params != NULL && cJSON_IsBool(item_params))
						{
							Print_NOTICE("Property ID, index: %d, text:%s, Value: %d\n"
								, j, item_params->string, (item_params->valuedouble > 0 ? 1 : 0));
						}
						if (item_params != NULL && cJSON_IsString(item_params))
						{
							Print_NOTICE("Property ID, index: %d, text:%s, Value: %s\n"
								, j, item_params->string, item_params->valuestring);
						}
					}
				}
				cJSON_Delete(request_root);
			}
			catch (...) {
				CLogger::createInstance()->Log(MsgWarn
					, "ConsumerAliyun::receive() error!");
			}
		}
	}
};

bool ConsumerAliyun::createControlCmd(unsigned long long devID, unsigned int pID, float val)
{
	unsigned long taskID = pyfree::getTaskIDFromDateTime();
	PFrom _pfrom;
	if (ptr_CacheDataObj->getFromInfo(devID, pID, _pfrom))
	{
		float _val = val;
		ptr_CacheDataObj->getCValue(devID, pID, _val);
		DataToGather wd(_pfrom.ipLong, OnSet, _pfrom.pID, _pfrom.pType, _val, 0, "Aliyun_IOT_Control", taskID);
		to_gather_queue->add(wd);

		CLogger::createInstance()->Log(MsgInfo
			, "TaskID[%lu] and down_node[1] setPValue from Dev Server,time(%s)"
			",devID(%ld),pID(%ld),val(%.3f),"
			"down_control_map ip[%s],pID(%d),pType(%d),val(%.2f)"
			, taskID
			, pyfree::getCurrentTime().c_str()
			, devID, pID, val
			, _pfrom.ipStr.c_str()
			, _pfrom.pID, static_cast<int>(_pfrom.pType), _val);

		if (is_Verification) {
			VerificationCache vit;
			vit.execTime = pyfree::getCurrentTimeByFormat("%04d%02d%02dT%02d%02d%02dZ");
			vit.taskID = taskID;
			vit.taskDesc = "JSon_Control";
			vit.devID = static_cast<unsigned long>(devID);
			vit.devDesc = _pfrom.devDesc;
			vit.pID = static_cast<unsigned long>(pID);
			vit.pDesc = _pfrom.pDesc;
			vit.pType = static_cast<unsigned int>(_pfrom.pType);
			vit.val = val;
			vit.limitTimeForCheck = static_cast<unsigned int>(time(NULL)) + 5;
			vit.eway_ = _pfrom.eway;
			ptr_vcc->addVerifyData(vit);
		}
	}
	else {
		//virtual ponit control
		PValueRet pret(val);
		ptr_CacheDataObj->setHValue(devID, pID, pret, true);
		CLogger::createInstance()->Log(MsgInfo
			, "TaskID[%lu] and down_node[0] setPValue from Dev Server,time(%s)"
			",devID(%ld),pID(%ld),val(%.3f)"
			",ditect set value to virtual ponit control"
			, taskID, pyfree::getCurrentTime().c_str()
			, devID, pID, pret.val_actual);
	}
	return true;
};
